メインコンテンツへスキップ
バージョン: 5.0

RocketMQ Connect クイックスタート

クイックスタート

このチュートリアルでは、スタンドアロンモードでRocketMQコネクタのサンプルプロジェクト「rocketmq-connect-sample」を開始して、コネクタの原則を理解するのに役立てていただきます。サンプルプロジェクトには、ソースファイルからデータを読み取ってRocketMQクラスタに送信するソースコネクタが用意されています。また、RocketMQクラスタからメッセージを読み取ってターゲットファイルに書き込むシンクコネクタも提供しています。

1. 準備: RocketMQの開始

  1. Linux/Unix/Mac
  2. 64ビットJDK 1.8以上;
  3. Maven 3.2.x以上;
  4. RocketMQを開始します。 RocketMQ 4.xRocketMQ 5.x 5.xバージョンを使用できます;
  5. ツールを使用してRocketMQメッセージの送受信をテストします。

ここでは、環境変数 NAMESRV_ADDR を使用して、ツールクライアントに RocketMQ の NameServer アドレスを localhost:9876 として知らせます。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意: RocketMQには、トピックとグループを自動的に作成する機能があります。メッセージを送信またはサブスクライブするとき、対応するトピックまたはグループが存在しない場合、RocketMQは自動的にそれらを作成します。したがって、事前にトピックとグループを作成する必要はありません。

2. コネクタランタイムの構築

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

注意: このプロジェクトにはすでにrocketmq-connect-sampleのコードがデフォルトで含まれているため、rocketmq-connect-sampleプラグインを別途構築する必要はありません。

3. スタンドアロンモードでコネクタワーカーを実行する

構成の変更

connect-standalone.confファイルを編集して、RocketMQ接続アドレスとその他の情報を構成します。詳細は 9. 設定ファイルの指示 を参照してください。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

スタンドアロンモードでは、RocketMQ Connectは同期チェックポイント情報をローカルファイルディレクトリ storePathRootDir に永続化します。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

同期のチェックポイントをリセットする場合は、永続的なチェックポイントファイルを削除する必要があります。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

スタンドアロンモードでコネクタワーカーを開始する

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

tips: 必要に応じて、docker/connect/bin/runconnect.sh を変更してJVMのスタートアップパラメータを調整できます。

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"

スタートアップログファイルを参照するには

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

ランタイムが正常に開始されると、ログファイルに次のように表示されます。

スタンドアロンワーカーの起動が成功しました。

tail -fコマンドのログ追跡モードを終了するには、Ctrl + Cキーの組み合わせを押します。

4. ソースコネクタを開始する

ソースファイルを作成し、テストデータを書き込む

mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt

echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt

注意: 空行があってはいけません (空行があるとデモプログラムはエラーを起こします)。ソースコネクタはソースファイルを読み込み続け、各データ行をメッセージ本文に変換して、RocketMQに送信し、シンクコネクタが消費できるようにします。

ソースコネクタを開始する

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'

curlリクエストがステータス200を返すと、正常に作成されたことを示します。応答の例

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}

ログファイルを参照する

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

以下のログが表示された場合は、ファイルソースコネクタが正常に開始されたことを意味します。

connectorファイルSourceConnectorが開始され、ターゲットの状態STARTEDに正常に設定されました!!

ソースコネクタの構成手順

キーnull可能デフォルト説明
connector.classfalseConnectorインターフェースを実装するクラス名 (パッケージ名を含む)
filenamefalseソースファイルの名前 (絶対パスを使用することを推奨)
connect.topicnamefalseファイルデータを同期するために必要なトピック

5. シンクコネクタを開始する

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'

curlリクエストがステータス200を返すと、正常に作成されたことを示します。応答の例

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}

ログファイルを参照する

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

以下のログが表示された場合は、ファイルシンクコネクタが正常に開始されたことを意味します。

connectorファイルSinkConnectorが開始され、ターゲットの状態STARTEDに正常に設定されました!!

シンクコネクタが宛先ファイルにデータを書き込んだかどうかを確認する

cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

test-sink-file.txtファイルが生成され、その内容がtest-source-file.txtと同じ場合、プロセス全体が正常に実行されていることを意味します。

ソースファイルtest-source-file.txtにテストデータの書き込みを続ける

cd /Users/YourUsername/rocketmqconnect/

echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt

# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

注意: rocketmq-connect-sampleはRocketMQトピックとの間でメッセージを送受信する際に通常メッセージを使用しているため、ファイルの内容の順序は異なる場合があります。これは順序付きメッセージとは異なり、通常メッセージを消費しても順序が保証されません。

シンクコネクタの構成手順

キーnull可能デフォルト説明
connector.classfalseConnectorインターフェースを実装するクラス名 (パッケージ名を含む)
filenamefalseシンクはデータをプルし、ファイルに保存します (絶対パスを使用することを推奨)
connect.topicnamesfalseシンクが処理する必要があるデータメッセージのトピック

ヒント: サンプルrocketmq-connect-sampleの構成ファイル手順は参考用であり、ソース/シンクコネクタによって構成が異なるため、特定のソース/シンクコネクタを参照してください。

6. コネクタの停止

コネクタを停止するための RESTful コマンド形式は、http://(ワーカー IP):(ポート)/connectors/(コネクタ名)/stop です。

デモの 2 つのコネクタを停止するには、以下のコマンドを使用できます。

curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop

curl コマンドがステータス 200 を返した場合、コネクタが正常に停止されたことを示します。応答例

{"status":200,"body":"Connector[fileSinkConnector]deleted successfully"}

次のログメッセージが表示された場合、ファイルシンクコネクタが正常にシャットダウンされたことを意味します。

tail -100f ~/logs/rocketmqconnect/connect_default.log

connectorName:fileSinkConnector に対するシャットダウンが完了しました。

7. ワーカープロセスの停止

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh

8. ログディレクトリ

以下のコマンドを使用してログディレクトリを表示できます。

ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect

9. 設定ファイルの指示

使用方法に基づいて、RESTful ポート、storeRoot パス、ネームサーバーアドレス、およびその他の情報を変更します。

設定ファイルの例を次に示します。

#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1

# Http prot for user to access REST API
httpPort=8082

# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876

# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=

storePathRootDir 設定の説明

スタンドアロンモードでは、RocketMQ Connect は同期チェックポイント情報を storePathRootDir で指定されたローカルファイルディレクトリに永続化します。永続的なファイルには次のものが含まれます。

キー説明
connectorConfig.jsonコネクタコンフィギュレーション永続化ファイル
position.jsonソースコネクトデータ処理進捗永続化ファイル
taskConfig.jsonタスクコンフィギュレーション永続化ファイル
offset.jsonシンクコネクトデータ使用進捗永続化ファイル
connectorStatus.jsonコネクタステータス永続化ファイル
taskStatus.jsonタスクステータス永続化ファイル